package com.bw.gmall.realtime.dws.app;

import com.bw.gmall.realtime.common.base.BaseSqlApp;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.function.SplitFunction;
import com.bw.gmall.realtime.common.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


//流量域搜索关键词粒度页面浏览各窗口汇总表
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW, 1, 10021);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String groupId) {
        // 1.获取日志表搜索的数据
        tableEnv.executeSql("create table page_log(\n" +
                "  page map<String,String>,\n" +
                "  common map<String,String>,\n" +
                "  ts bigint,\n" +
                " et as to_timestamp_ltz(ts, 3), " +
                " watermark for et as et - interval '5' second " +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, Constant.TOPIC_DWD_TRAFFIC_PAGE));

        // 2.获取搜索商品的数据
        Table table = tableEnv.sqlQuery("select \n" +
                "`page`['item'] item,\n" +
                "et\n" +
                "from page_log\n" +
                "where `page`['last_page_id'] = 'search'\n" +
                "and `page`['item_type'] = 'keyword'");

        tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

        tableEnv.createTemporaryView("word", table);
        // 3. 炸开函数

        Table table1 = tableEnv.sqlQuery("select \n" +
                "  item,\n" +
                "  word,\n" +
                "  et\n" +
                "from word,lateral table(SplitFunction(item))");

        tableEnv.createTemporaryView("words", table1);
//        table1.execute().print();
        // 4. 开窗聚合



        Table result = tableEnv.sqlQuery("select " +
                " date_format(window_start, 'yyyy-MM-dd HH:mm:ss') stt, " +
                " date_format(window_end, 'yyyy-MM-dd HH:mm:ss') edt, " +
                " date_format(now(), '2024-11-20') cur_date, " +
                " word keyword," +
                " count(*) keyword_count " +
                "from table( tumble(table words, descriptor(et), interval '5' second ) ) " +
                "group by window_start, window_end, word ");

//        result.execute().print();

        // 5. 写出doris

//        create database gmall2023_realtime;
//        drop table if exists gmall2023_realtime.dws_traffic_source_keyword_page_view_window;
//        create table if not exists gmall2023_realtime.dws_traffic_source_keyword_page_view_window
//            (
//            `stt`           DATETIME COMMENT '窗口起始时间',
//            `edt`           DATETIME COMMENT '窗口结束时间',
//            `cur_date`      DATE COMMENT '当天日期',
//            `keyword`       VARCHAR(128) COMMENT '搜索关键词',
//            `keyword_count` BIGINT REPLACE COMMENT '搜索关键词出现次数'
//            ) engine = olap aggregate key (`stt`,`edt`,`cur_date`,`keyword`)
//            partition by range(`cur_date`)()
//            distributed by hash(`stt`) buckets 10
//        properties (
//                "replication_num" = "3",
//                "dynamic_partition.enable" = "true",
//                "dynamic_partition.time_unit" = "DAY",
//                "dynamic_partition.end" = "3",
//                "dynamic_partition.prefix" = "par",
//                "dynamic_partition.buckets" = "10"
//        );
        tableEnv.executeSql("CREATE TABLE dws_traffic_source_keyword_page_view_window (\n" +
                " stt  STRING,\n" +
                " edt STRING,\n" +
                " cur_date STRING,\n" +
                " keyword  STRING,\n" +
                " keyword_count BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));
//
//        // 使用doris必须开启CK
        result.insertInto(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW).execute();





    }
}